[core] DataEvolution table supports concurrent updates to different columns#7867
Conversation
JingsongLi
left a comment
There was a problem hiding this comment.
There is a way not to add schemaId and writable Cols to SimpleFileEntry:
Do not add fields to SimpleFileEntry, but build RowIdColumnConflictChecker while still holding the complete ManifestEntry/DataFileMeta, and pass it to the conflict detection method
|
@JingsongLi Thanks! I think you are right! It's not wise to continuously broaden SimpleEntry. I've refactored the code |
JingsongLi
left a comment
There was a problem hiding this comment.
Review: RowIdColumnConflictChecker for Column-Level Conflict Detection
Nice improvement — the column-aware conflict detection is well-designed. The binary search approach gives excellent performance as shown by the benchmarks, and the field-ID-based comparison correctly handles schema evolution (column renames). A few observations:
Binary Search Correctness
The firstPossibleRange method searches for the first WriteRange whose to >= target.from. However, writeRanges is sorted by (from, to). This works correctly because:
- If a range's
to < target.from, it cannot overlap regardless of itsfrom - Finding the first range with
to >= target.fromis a valid starting point
But there's a subtle gap: after binary search, the linear scan uses writeRange.range.from > range.to as the early-exit condition. Since ranges are sorted by from, this is correct — once from exceeds the query's to, no subsequent ranges can overlap. Well done.
Thread Safety of fieldIdByNameCache
fieldIdByNameCache is a plain HashMap. This is fine as long as RowIdColumnConflictChecker instances are not shared across threads (they're constructed per commit attempt in FileStoreCommitImpl). Worth noting in a comment if the class might be reused in concurrent contexts in the future.
containsAnyWriteField with null writeCols on the checked file
When the committed incremental file has writeCols == null, containsAnyWriteField returns true immediately (line ~179). This is correct and conservative — a full-schema write can conflict with any partial write.
Merged Range Inflation
In testMergesOverlappedDeltaRangesAndWriteColumns: files [0,10] writing "b" and [5,15] writing "c" are merged into a single WriteRange [0,15] with fieldIds {b, c}. This means a file at range [12,12] writing only "b" will be flagged as conflicting, even though the actual "b" write only covers [0,10]. This is an over-approximation — acceptable for correctness (false positive = retry, not data loss), but could lead to unnecessary commit failures when many small partial writes overlap in row ranges but target different columns. The current approach is a reasonable trade-off for simplicity and performance.
baseSnapshotId in DataEvolutionMergeIntoAction
Good fix — pinning baseSnapshotId ensures the scan and conflict check use a consistent view. The .withSnapshot(baseSnapshotId) on the scan in shuffleByFirstRowId prevents TOCTOU issues where new snapshots appear between job planning and execution.
Minor Suggestion
In DataEvolutionPartialWriteOperator, the baseSnapshotId field is declared as Long (boxed). Since it's always assigned from a primitive long and is never null, consider using long to avoid accidental NPE and unnecessary boxing.
Test Coverage
The unit tests cover the key scenarios well (disjoint columns, same columns, rename across schemas, null writeCols, merged ranges, binary search scan-past). The Spark IT test (concurrent merge with disjoint update columns) validates the end-to-end behavior nicely.
Overall: solid design, clean implementation, good test coverage.
Purpose
Currently, if we concurrently trigger columns update jobs for data evolution table, the later jobs will be failed:

This happens even we are updating different columns.
However, we could only forbid updating the same columns. This PR introduces a RowIdColumnConflictChecker and use binary search to accelerate check process.
BenchMark
I tested 100000 files with disjoint row range check against 100000 files with disjoint row range, each file contains 3 columns, the result is:
with binary search, the check cost is negligible.
Tests
UT Cases for Spark and Unit Tests for core.